package com.niit.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

object Spark_Stream_Join {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(5))
    ssc.sparkContext.setLogLevel("ERROR")

    val lines9 = ssc.socketTextStream("localhost",9999);
    val lines8 = ssc.socketTextStream("localhost",8888);

    val map9 =  lines9.map( (_,"A") )
    val map8 = lines8.map( (_,"B") )

    val resJoin: DStream[(String, (String, String))] = map9.join(map8)

    resJoin.print()

    ssc.start()
    ssc.awaitTermination()
  }

}
